# uncompyle6 version 3.2.3
# Python bytecode 3.6 (3379)
# Decompiled from: Python 3.6.8 |Anaconda custom (64-bit)| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
# Embedded file name: site-packages\gevent\threadpool.py
from __future__ import absolute_import
import sys, os
from gevent._compat import integer_types
from gevent.hub import get_hub, getcurrent, sleep, _get_hub
from gevent.event import AsyncResult
from gevent.greenlet import Greenlet
from gevent.pool import GroupMappingMixin
from gevent.lock import Semaphore
from gevent._threading import Lock, Queue, start_new_thread

__all__ = ["ThreadPool", "ThreadResult"]


class ThreadPool(GroupMappingMixin):
    """
    .. note:: The method :meth:`apply_async` will always return a new
       greenlet, bypassing the threadpool entirely.
    """

    def __init__(self, maxsize, hub=None):
        if hub is None:
            hub = get_hub()
        self.hub = hub
        self._maxsize = 0
        self.manager = None
        self.pid = os.getpid()
        self.fork_watcher = hub.loop.fork(ref=False)
        self._init(maxsize)

    def _set_maxsize(self, maxsize):
        if not isinstance(maxsize, integer_types):
            raise TypeError("maxsize must be integer: %r" % (maxsize,))
        if maxsize < 0:
            raise ValueError("maxsize must not be negative: %r" % (maxsize,))
        difference = maxsize - self._maxsize
        self._semaphore.counter += difference
        self._maxsize = maxsize
        self.adjust()
        self._semaphore._start_notify()

    def _get_maxsize(self):
        return self._maxsize

    maxsize = property(_get_maxsize, _set_maxsize)

    def __repr__(self):
        return "<%s at 0x%x %s/%s/%s>" % (
            self.__class__.__name__,
            id(self),
            len(self),
            self.size,
            self.maxsize,
        )

    def __len__(self):
        return self.task_queue.unfinished_tasks

    def _get_size(self):
        return self._size

    def _set_size(self, size):
        if size < 0:
            raise ValueError("Size of the pool cannot be negative: %r" % (size,))
        if size > self._maxsize:
            raise ValueError(
                "Size of the pool cannot be bigger than maxsize: %r > %r"
                % (size, self._maxsize)
            )
        if self.manager:
            self.manager.kill()
        while self._size < size:
            self._add_thread()

        delay = 0.0001
        while self._size > size:
            while self._size - size > self.task_queue.unfinished_tasks:
                self.task_queue.put(None)

            if getcurrent() is self.hub:
                break
            sleep(delay)
            delay = min(delay * 2, 0.05)

        if self._size:
            self.fork_watcher.start(self._on_fork)
        else:
            self.fork_watcher.stop()

    size = property(_get_size, _set_size)

    def _init(self, maxsize):
        self._size = 0
        self._semaphore = Semaphore(1)
        self._lock = Lock()
        self.task_queue = Queue()
        self._set_maxsize(maxsize)

    def _on_fork(self):
        pid = os.getpid()
        if pid != self.pid:
            self.pid = pid
            self._init(self._maxsize)

    def join(self):
        """Waits until all outstanding tasks have been completed."""
        delay = 0.0005
        while self.task_queue.unfinished_tasks > 0:
            sleep(delay)
            delay = min(delay * 2, 0.05)

    def kill(self):
        self.size = 0

    def _adjust_step(self):
        while self._size < self._maxsize:
            if self.task_queue.unfinished_tasks > self._size:
                self._add_thread()

        while self._size - self._maxsize > self.task_queue.unfinished_tasks:
            self.task_queue.put(None)

        if self._size:
            self.fork_watcher.start(self._on_fork)
        else:
            self.fork_watcher.stop()

    def _adjust_wait(self):
        delay = 0.0001
        while True:
            self._adjust_step()
            if self._size <= self._maxsize:
                return
            sleep(delay)
            delay = min(delay * 2, 0.05)

    def adjust(self):
        self._adjust_step()
        if not self.manager:
            if self._size > self._maxsize:
                self.manager = Greenlet.spawn(self._adjust_wait)

    def _add_thread(self):
        with self._lock:
            self._size += 1
        try:
            start_new_thread(self._worker, ())
        except:
            with self._lock:
                self._size -= 1
            raise

    def spawn(self, func, *args, **kwargs):
        """
        Add a new task to the threadpool that will run ``func(*args, **kwargs)``.
        
        Waits until a slot is available. Creates a new thread if necessary.
        
        :return: A :class:`gevent.event.AsyncResult`.
        """
        while 1:
            semaphore = self._semaphore
            semaphore.acquire()
            if semaphore is self._semaphore:
                break

        thread_result = None
        try:
            task_queue = self.task_queue
            result = AsyncResult()
            thread_result = ThreadResult(
                result, hub=self.hub, call_when_ready=semaphore.release
            )
            task_queue.put((func, args, kwargs, thread_result))
            self.adjust()
        except:
            if thread_result is not None:
                thread_result.destroy()
            semaphore.release()
            raise

        return result

    def _decrease_size(self):
        if sys is None:
            return
        _lock = getattr(self, "_lock", None)
        if _lock is not None:
            with _lock:
                self._size -= 1

    _destroy_worker_hub = False

    def _worker(self):
        need_decrease = True
        try:
            while True:
                task_queue = self.task_queue
                task = task_queue.get()
                try:
                    if task is None:
                        need_decrease = False
                        self._decrease_size()
                        return
                    func, args, kwargs, thread_result = task
                    try:
                        value = func(*args, **kwargs)
                    except:
                        exc_info = getattr(sys, "exc_info", None)
                        if exc_info is None:
                            return
                        thread_result.handle_error((self, func), exc_info())
                    else:
                        if sys is None:
                            return
                        thread_result.set(value)
                        del value
                    finally:
                        del func
                        del args
                        del kwargs
                        del thread_result
                        del task

                finally:
                    if sys is None:
                        return
                    task_queue.task_done()

        finally:
            if need_decrease:
                self._decrease_size()
            if sys is not None and self._destroy_worker_hub:
                hub = _get_hub()
                if hub is not None:
                    hub.destroy(True)
                del hub

    def apply_e(self, expected_errors, function, args=None, kwargs=None):
        """
        .. deprecated:: 1.1a2
           Identical to :meth:`apply`; the ``expected_errors`` argument is ignored.
        """
        return self.apply(function, args, kwargs)

    def _apply_immediately(self):
        return get_hub() is not self.hub

    def _apply_async_cb_spawn(self, callback, result):
        callback(result)

    def _apply_async_use_greenlet(self):
        return True


class ThreadResult(object):
    __slots__ = (
        "exc_info",
        "async",
        "_call_when_ready",
        "value",
        "context",
        "hub",
        "receiver",
    )

    def __init__(self, receiver, hub=None, call_when_ready=None):
        if hub is None:
            hub = get_hub()
        self.receiver = receiver
        self.hub = hub
        self.context = None
        self.value = None
        self.exc_info = ()
        self.async = hub.loop.async()
        self._call_when_ready = call_when_ready
        self.async.start(self._on_async)

    @property
    def exception(self):
        if self.exc_info:
            return self.exc_info[1]

    def _on_async(self):
        self.async.stop()
        if self._call_when_ready:
            self._call_when_ready()
        try:
            if self.exc_info:
                self.hub.handle_error(self.context, *self.exc_info)
            self.context = None
            self.async = None
            self.hub = None
            self._call_when_ready = None
            if self.receiver is not None:
                self.receiver(self)
        finally:
            self.receiver = None
            self.value = None
            if self.exc_info:
                self.exc_info = (self.exc_info[0], self.exc_info[1], None)

    def destroy(self):
        if self.async is not None:
            self.async.stop()
        self.async = None
        self.context = None
        self.hub = None
        self._call_when_ready = None
        self.receiver = None

    def _ready(self):
        if self.async is not None:
            self.async.send()

    def set(self, value):
        self.value = value
        self._ready()

    def handle_error(self, context, exc_info):
        self.context = context
        self.exc_info = exc_info
        self._ready()

    def successful(self):
        return self.exception is None


def wrap_errors(errors, function, args, kwargs):
    """
    .. deprecated:: 1.1a2
       Previously used by ThreadPool.apply_e.
    """
    try:
        return (True, function(*args, **kwargs))
    except errors as ex:
        return (False, ex)


try:
    import concurrent.futures
except ImportError:
    pass
else:
    __all__.append("ThreadPoolExecutor")
    from gevent.timeout import Timeout as GTimeout
    from gevent._util import Lazy
    from concurrent.futures import _base as cfb

    def _wrap_error(future, fn):
        def cbwrap(_):
            del _
            try:
                fn(future)
            except Exception:
                future.hub.print_exception((fn, future), *sys.exc_info())

        cbwrap.auto_unlink = True
        return cbwrap

    def _wrap(future, fn):
        def f(_):
            fn(future)

        f.auto_unlink = True
        return f

    class _FutureProxy(object):
        def __init__(self, asyncresult):
            self.asyncresult = asyncresult

        @Lazy
        def _condition(self):
            from gevent import monkey

            if monkey.is_module_patched("threading") or self.done():
                import threading

                return threading.Condition()
            raise AttributeError("_condition")

        @Lazy
        def _waiters(self):
            self.asyncresult.rawlink(self._FutureProxy__when_done)
            return []

        def __when_done(self, _):
            waiters = getattr(self, "_waiters")
            for w in waiters:
                if self.successful():
                    w.add_result(self)
                else:
                    w.add_exception(self)

        _FutureProxy__when_done.auto_unlink = True

        @property
        def _state(self):
            if self.done():
                return cfb.FINISHED
            else:
                return cfb.RUNNING

        def set_running_or_notify_cancel(self):
            pass

        def result(self, timeout=None):
            try:
                return self.asyncresult.result(timeout=timeout)
            except GTimeout:
                raise concurrent.futures.TimeoutError()

        def exception(self, timeout=None):
            try:
                self.asyncresult.get(timeout=timeout)
                return self.asyncresult.exception
            except GTimeout:
                raise concurrent.futures.TimeoutError()

        def add_done_callback(self, fn):
            if self.done():
                fn(self)
            else:
                self.asyncresult.rawlink(_wrap_error(self, fn))

        def rawlink(self, fn):
            self.asyncresult.rawlink(_wrap(self, fn))

        def __str__(self):
            return str(self.asyncresult)

        def __getattr__(self, name):
            return getattr(self.asyncresult, name)

    class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
        """
        A version of :class:`concurrent.futures.ThreadPoolExecutor` that
        always uses native threads, even when threading is monkey-patched.
        
        The ``Future`` objects returned from this object can be used
        with gevent waiting primitives like :func:`gevent.wait`.
        
        .. caution:: If threading is *not* monkey-patched, then the ``Future``
           objects returned by this object are not guaranteed to work with
           :func:`~concurrent.futures.as_completed` and :func:`~concurrent.futures.wait`.
           The individual blocking methods like :meth:`~concurrent.futures.Future.result`
           and :meth:`~concurrent.futures.Future.exception` will always work.
        
        .. versionadded:: 1.2a1
           This is a provisional API.
        """

        def __init__(self, max_workers):
            super(ThreadPoolExecutor, self).__init__(max_workers)
            self._threadpool = ThreadPool(max_workers)
            self._threadpool._destroy_worker_hub = True

        def submit(self, fn, *args, **kwargs):
            with self._shutdown_lock:
                if self._shutdown:
                    raise RuntimeError("cannot schedule new futures after shutdown")
                future = self._threadpool.spawn(fn, *args, **kwargs)
                return _FutureProxy(future)

        def shutdown(self, wait=True):
            super(ThreadPoolExecutor, self).shutdown(wait)
            kill = getattr(self._threadpool, "kill", None)
            if kill:
                self._threadpool.kill()
            self._threadpool = None

        kill = shutdown

        def _adjust_thread_count(self):
            pass
